Cloud Composer で DAG の多重起動を防ぎたい
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
やりたいこと
データパイプラインの管理において、同じDAGが同時に多重に実行されると、リソースの消費が増えます。
また、データの整合性を保つための困難さが増します。
とくに処理に時間を要するタスクが含まれる場合、同じDAGが複数回実行されると予期しない結果が生じることがあります。
今回はmax_active_runs
パラメータを使用してDAGの多重起動を防止する方法をご紹介します。
max_active_runs
max_active_runs
はDAG設定の一部で、同時に実行できるDAGの最大数を設定します。
この値を'1'に設定すると、同時に2つ以上のDAGインスタンスが実行されることはありません。
これにより、前のDAG実行が完了するまで、次のDAG実行が待機状態になります。
それでは、max_active_runs
を使ったDAGを作成し、Cloud Composer で動かしてみましょう。
Cloud Composer 環境を作成
DAGを動かす Cloud Composer 環境を作成します。
Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。
test-composer
という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。
DAG を作成
Cloud Composer 環境で実行する DAGを作成します。
以下は、DAGが多重起動する可能性がある例です。
このDAGでは、2分ごとにスケジュールされ、スリープタスクが3分間スリープするため、次のスケジュール時に前のDAG実行がまだ実行中である可能性があります。
from datetime import timedelta import time from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago def sleep_for_a_while(): time.sleep(180) # 3分間スリープ with DAG( dag_id="sleep_for_a_while", start_date=days_ago(1), schedule_interval=timedelta(minutes=2), # DAGは2分ごとに実行 catchup=False, ) as dag: # DAGの開始を示すタスク start_operator = DummyOperator( task_id="run_this_first" ) # スリープするタスク sleep_operator = PythonOperator( task_id='sleep', python_callable=sleep_for_a_while ) # DAGの終了を示すタスク end_operator = DummyOperator( task_id="run_this_last" ) # 開始タスク -> スリープタスク -> 終了タスク start_operator >> sleep_operator >> end_operator
test-composer
の[DAG]リンクからDAGフォルダに移動します。
DAGフォルダにファイルをアップロードしてDAGをデプロイします。
DAG を実行
先ほどデプロイしたDAGを実際に動かしてみて、DAGが多重起動する場合の挙動を確認してみましょう。
DAGが多重起動する場合
デプロイ直後、最初のDAGが動き始めました。
スリープタスクが実行中の状況です。
最初のDAG実行から2分後、前のDAGがまだ完了していないにもかかわらず新たなDAGが実行を開始しました。
これが多重起動の状況です。
さらに2分後、新たなDAGが実行を開始しました。
DAGが多重起動しない場合
次に、DAGが多重起動しない場合の挙動を確認してみましょう。
先ほどのDAGにmax_active_runs=1
を設定してデプロイします。
タスクの内容は先ほどのDAGと同じです。
from datetime import timedelta import time from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago def sleep_for_a_while(): time.sleep(180) # 3分間スリープ with DAG( dag_id="sleep_for_a_while_max_active_runs", start_date=days_ago(1), schedule_interval=timedelta(minutes=2), # DAGは2分ごとに実行 max_active_runs=1, catchup=False, ) as dag: # DAGの開始を示すタスク start_operator = DummyOperator( task_id="run_this_first" ) # スリープするタスク sleep_operator = PythonOperator( task_id='sleep', python_callable=sleep_for_a_while ) # DAGの終了を示すタスク end_operator = DummyOperator( task_id="run_this_last" ) # 開始タスク -> スリープタスク -> 終了タスク start_operator >> sleep_operator >> end_operator
デプロイ直後、最初のDAGが動き始めました。
スリープタスクが実行中の状況です。先ほどのDAGと同じ挙動ですね。
最初のDAG実行から3分後、前のDAGが完了してから新たなDAGが実行を開始しました。
本来のスケジュールでは2分ごとの実行ですが、max_active_runs=1
の設定により次のDAG実行が待機されていたようです。
2番目のDAGの実行が完了した後、3番目のDAGが実行を開始しました。
同時に2つ以上のDAGが実行されない制御されているようです。
DAGの設定にmax_active_runs=1
を追加することで、多重起動を防止することができました!
max_active_runs_per_dag
max_active_runs_per_dag
は、すべてのDAGに対する同時実行可能なDAGインスタンスの最大数を設定します。
Airflow の構成(airflow.cfg
)で定義されています。確認してみましょう。
[Admin] - [Configurations] をクリックします。
max_active_runs_per_dag
の値は '25' で定義されています。
先ほどのDAGで設定したmax_active_runs
は、max_active_runs_per_dag
の値を上書きします。
max_active_runs
が未設定のDAGは、max_active_runs_per_dag
の値('25')が設定されていることになります。
設定値の変更
max_active_runs_per_dag
は、以下のコマンドで設定を行うことができます。
$ gcloud composer environments update test-composer \ --location asia-northeast1 \ --update-airflow-configs=core-max_active_runs_per_dag=1
以下のコマンドで設定を削除することができます。
$ gcloud composer environments update test-composer \ --location asia-northeast1 \ --remove-airflow-configs=core-max_active_runs_per_dag
まとめ
以上、max_active_runs
パラメータを使用してDAGの多重起動を防止する方法をご紹介しました。
max_active_runs
を使用することで、システムの過負荷を避けたり、データの整合性を保つといった課題に対処することができます。
一方で、max_active_runs
の値が低く設定されていると、新しいDAG実行の開始までの待ち時間が不必要に長くなり、効率性が損なわれる可能性があります。
max_active_runs
の値は、システムのリソース、タスクの実行時間、DAGのスケジューリング間隔等を考慮して適切に設定する必要がありますので、設定の意味や挙動を理解してパラメータチューニングを行うことが重要ですね。
今回の検証が誰かのお役に立てれば幸いです!
参考
- Scaling Airflow to optimize performance | Astronomer Documentation
- Introduction to Airflow max_active_runs parameter for efficient task scheduling - Naiveskill
- Configuration Reference — Airflow Documentation
- gcloud composer environments update | Google Cloud CLI Documentation
- Cloud Composer の概要 | Google Cloud
- Cloud Composer 環境を作成する | Google Cloud